昨天本喵說到,為了避免太快送出 Indicate 導致的失敗 (在還沒收到前一筆 Indicate 的 Confirm 前,就送出下一筆 Indicate),所以會製作一個簡易的流量管控,現在咱們就開始吧~
咱們讓 BleTxScheduler
類別負責管理 Notify 和 Indicate 的送出。這邊要稍微說明一下 Notify 是什麼,其實它和 Indicate 的機制很像,只是送出後,不需要等待 GATT Client 的 Confirm。所以基本上可以不管不顧地拼命送,但對方是否真的有收到就未可知了。
咱們在 BleTxScheduler
使用了單例模式,這表示無論呼叫 BleTxScheduler()
多少次,都會得到同一實例:
class BleTxScheduler:
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, size=8) -> None:
if self._initialized:
return
# 儲存執行 Notify/Indicate 的函數與其參數
self._queue_ntf_ind: collections.deque[tuple] = collections.deque((), size)
# 通知系統可以送出下一筆 Indication
self._ind_done_flag = asyncio.ThreadSafeFlag()
register_irq_handler(self._ble_isr)
self._initialized = True
為什麼要使用單例模式呢?因為所有要執行 Notify 和 Indicate 的地方都需要它,若不用單例模式,那每個要使用的模組或類別都必須傳入它的一個參考,這將花費不少記憶體,而且也會增加模組和類別的介面的複雜度。
當然咱們也不一定需要單例模式,還記得 IddServer
類別嗎?當時咱們就只是用個全域變數來使用它。那麼在 BleTxScheduler
上,若使用全域變數,是否可以減少記憶體的使用量呢?咱們可以類似這樣設計:
# ble/stack.py
bleTxScheduler = None
class BleTxScheduler:
@classmethod
def init_inst(cls):
global bleTxScheduler
if bleTxScheduler is None:
bleTxScheduler = cls()
# ble/server.py
class IdsServer(ble.stack.Server):
def __init__(self):
ble.stack.BleTxScheduler.init_inst()
async def run(self):
await ble.stack.bleTxScheduler.run()
async def test():
ble.stack.bleTxScheduler.add(
ble.stack.ACT_INDICATE,
indicate,
instance._idd_status_changed.value_handle,
data,
)
測試結果如下:
MicroPython 1.25.0 | 單例模式 | 全域變數 | 說明 |
---|---|---|---|
Free Heap RAM (bytes) | 135648 | 134688 | 愈大愈好 |
max new split | 102400 | 102400 | 愈大愈好 |
No. of 1-blocks | 304 | 324 | 愈小愈好 |
No. of 2-blocks | 86 | 91 | 愈小愈好 |
max blk sz | 72 | 72 | 愈大愈好 |
max free sz | 1051 | 922 | 愈大愈好 |
MicroPython 1.26.0 | 單例模式 | 全域變數 | 說明 |
---|---|---|---|
Free Heap RAM (bytes) | 138928 | 137776 | 愈大愈好 |
max new split | 106496 | 106496 | 愈大愈好 |
No. of 1-blocks | 320 | 340 | 愈小愈好 |
No. of 2-blocks | 91 | 96 | 愈小愈好 |
max blk sz | 72 | 72 | 愈大愈好 |
max free sz | 924 | 865 | 愈大愈好 |
可以看到,在 BleTxScheduler
,單例模式在記憶體使用上,可說是完勝全域變數,但這並非通例。比如 IdsServer
類別,當 BleTxScheduler
使用單例模式時,IdsServer
在兩種模式下各有勝負:
MicroPython 1.25.0 | 單例模式 | 全域變數 | 說明 |
---|---|---|---|
Free Heap RAM (bytes) | 135168 | 135648 | 愈大愈好 |
max new split | 102400 | 102400 | 愈大愈好 |
No. of 1-blocks | 304 | 304 | 愈小愈好 |
No. of 2-blocks | 85 | 86 | 愈小愈好 |
max blk sz | 72 | 72 | 愈大愈好 |
max free sz | 1042 | 1051 | 愈大愈好 |
MicroPython 1.26.0 | 單例模式 | 全域變數 | 說明 |
---|---|---|---|
Free Heap RAM (bytes) | 138544 | 138928 | 愈大愈好 |
max new split | 106496 | 106496 | 愈大愈好 |
No. of 1-blocks | 319 | 320 | 愈小愈好 |
No. of 2-blocks | 90 | 91 | 愈小愈好 |
max blk sz | 72 | 72 | 愈大愈好 |
max free sz | 908 | 924 | 愈大愈好 |
因為綜合來看,IdsServer
使用全域變數方案時,在記憶體使用上看似更好,所以先前才會讓其以全域變數存在。
如先前所述,這些測試都只是一種參考而已,有時一點小變動,就會使結果完全反轉。所以除非真有必要,不要太執著於此,而是先專注於整體的規劃。
BleTxScheduler
類別有個成員變數 _queue_ntf_ind
,它負責儲存要執行 Notify 或 Indicate 所需的資訊,現在就是將這些資訊儲存到 _queue_ntf_ind
:
def add(self, type: int, fn, value_handle, arg):
# 佇列項目型態為 tuple。
# type: 指示此操作是 Notify 或 Indicate。
# fn: 執行 Notify/Indicate 的實作。
# fn(conn_handle: int | None, value_handle: int, arg) -> bool
# value_handle: 相關 Characteristic 的 handle
# arg: fn 需要的參數。
# 此函數只能在主執行緒被呼叫,否則將有同步問題。
self._queue_ntf_ind.append((type, fn, value_handle, arg))
咱們讓此函數只能在主執行緒被執行,亦即不能在中斷函數裡呼叫。因為 Notify 和 Indicate 都是 GATT Server 主動送出訊息,所以本就不必特地在中斷裡去呼叫。當然,是可以設計成在計時器中斷裡送出 Notify / Indicate,不過為了讓計時器中斷處理程序可以儘快完成,所以會用 micropython.schedule()
來安排執行。
為什麼 fn
所接收的參數是 conn_handle
、value_handle
和 arg
呢?其實這是依賴於 ble.stack.indicate(conn_handle, value_handle, data)
和 ble.stack.notify(conn_handle, value_handle, data)
。那為什麼是用 arg
這個名稱,而不是 data
呢?這是因為咱們預期 fn
會利用 arg
來創建要傳輸的資料。也就是 fn
會先藉由 arg
建立傳送的資料後,再使用 ble.stack.indicate()
或 ble.stack.notify()
傳送資料。
接下來就是讓排程器不斷由佇列裡取出要送出的項目,若是 Indicate,就等待 _ind_done_flag
因收到 _IRQ_GATTS_INDICATE_DONE 而解除等待:
async def run(self):
"""因 _queue_ntf_ind 只會在主執行緒被存取,所以無須同步。"""
while True:
while self._queue_ntf_ind:
type, fn, value_handle, arg = self._queue_ntf_ind.popleft()
successful = fn(_conn_handle, value_handle, arg)
if successful and type == ACT_INDICATE:
try:
await asyncio.wait_for(self._ind_done_flag.wait(), 5)
except asyncio.TimeoutError:
common.logger.write("Indicate timeout!")
await asyncio.sleep_ms(100)
def _ble_isr(self, event, data):
if event == _IRQ_GATTS_INDICATE_DONE:
self._ind_done_flag.set()
可能有看官會覺得為什麼要在佇列為空的時候,使用 sleep_ms()
?這樣不是浪費執行片段嗎?其實完全可以如下這樣做:
def __init__(self, size=8) -> None:
# 用來通知 _queue_ntf_ind 不為空
self._new_item_event = asyncio.Event()
def add(self, type: int, fn, value_handle, arg):
self._queue_ntf_ind.append((type, fn, value_handle, arg))
self._new_item_event.set()
async def run(self):
"""因 _queue_ntf_ind 只會在主執行緒被存取,所以無須同步。"""
while True:
while self._queue_ntf_ind:
...
# 如果沒有資料,就等到有資料為止
self._new_item_event.clear()
await self._new_item_event.wait()
但這只能在 micropython 1.26.0 運作,在 micropython 1.25.0 會無法啟用 run()
協程,這可能是 micropython 1.25.0 的問題。
因 BleTxScheduler
已是單例,所以只要如下使用即可:
class IdsServer(ble.stack.Server):
async def run(self):
await ble.stack.BleTxScheduler().run()
async def test():
for i in range(8):
data = bytearray(1)
data[0] = i + 1
ble.stack.BleTxScheduler().add(
ble.stack.ACT_INDICATE,
indicate,
instance._idd_status_changed.value_handle,
data,
)
Notify / Indicate 排程完成了,明天咱們就能將它應用到 IDD Statys Changed characteristic 上。